<?php
// +-------------------------------------------------------------------
// | 
// +-------------------------------------------------------------------
// | Copyright (c) 2009-2016 All rights reserved.
// +-------------------------------------------------------------------
namespace Kcdns\Service\Mqueue;

/**
 * status 0:未处理;1:处理中;2已处理;3:失败;
 */
class Mqueue
{

    static $instace;

    protected $db = null;

    public static function getInstance ()
    {
        return self::$instace ?  : (self::$instace = new self());
    }

    public function __construct ()
    {
        $this->db = M('mqueue');
    }

    /**
     * 发布异步任务
     *
     * @param string $callback            
     * @param string $param            
     * @param number $queue            
     * @throws \Exception
     */
    public function send ($callback = '', $param = '', $queue = 0)
    {
        filter($queue, 'required;int');
        
        if (! is_callable($callback))
        {
            throw new \Exception('Invalid callback');
        }
        
        $message = json_encode(compact('callback', 'param'));
        $data = array(
                'queue' => $queue,
                'mark' => md5($message),
                'status' => 0,
                'create_time' => date('Y-m-d H:i:s'),
                'message' => $message
        );
        return $this->db->data($data)->add();
    }

    /**
     * 顺序执行未读任务
     *
     * @param number $queue            
     * @return NULL|mixed
     */
    public function handle ($queue = 0)
    {
        $sc = 0;
        $ec = 0;
        try
        {
            while ($record = $this->_read(array(
                    'queue' => $queue,
                    'status' => 0
            )))
            {
                $this->_run($record) ? $sc ++ : $ec ++;
            }
        }
        catch (\Exception $e)
        {
            $ec ++;
        }
        return $sc||$ec?"【{$sc}】成功 / 【{$ec}】失败":'-';
    }

    /**
     * 执行指定的未成功任务
     *
     * @param unknown $messageId            
     * @throws \Exception
     */
    public function execute ($messageId)
    {
        $record = $this->_read(array(
                'id' => $messageId
        ));
        if (!$record or $record['status'] == 2)
        {
            throw new \Exception('Done at ' . $record['done_time']);
        }
        return $this->_run($record);
    }

    /**
     * 读取消息
     *
     * @param unknown $where            
     * @throws \Exception
     */
    protected function _read ($where)
    {
        filter($where, array(
                'id' => 'int',
                'queue' => 'int',
                'status' => 'enum=0,1,2,3'
        ));
        
        $order = "id ASC";
        $record = $this->db->where($where)->order($order)->find();
        if (! $record)
        {
            return false;
        }
        
        // 消息未读时记录读取时间, 变更状态为处理中
        if ($record['status'] == 0)
        {
            $data = array(
                    'status' => 1,
                    'read_time' => date('Y-m-d H:i:s')
            );
            $this->db->where(array(
                    'id' => $record['id']
            ))->data($data)->save();
        }
        
        return $record;
    }

    /**
     * 执行回调
     */
    protected function _run ($record)
    {
        $task = json_decode($record['message'], true) ?  : array();
        
        do
        {
            $status = false;
            
            // 回调函数无效
            if (! is_callable($task['callback']))
            {
                $data = array(
                        'status' => 3,
                        'error' => "[{$task['callback']}] is not callable"
                );
                break;
            }
            
            $error = 'return false';
            try
            {
                $status = call_user_func($task['callback'], $task['param']);
            }
            catch (\Exception $e)
            {
                $status = false;
                $error = $e->getMessage();
                \KCSLog::WARN($error);
                \KCSLog::DEBUG($e);
            }
            
            if ($status === false)
            {
                $data = array(
                        'status' => 3,
                        'error' => $error
                );
                break;
            }
            
            $data = array(
                    'status' => 2,
                    'done_time' => date('Y-m-d H:i:s')
            );
        }
        while (0);
        
        $this->db->where(array(
                'id' => $record['id']
        ))->data($data)->save();
        
        return (boolean) $status;
    }
}